package com.gitee.charlesvhe.nifi.processors;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.dbcp.DBCPService;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.util.db.JdbcCommon;

import java.io.BufferedInputStream;
import java.io.InputStream;
import java.sql.*;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicReference;

public class JsonToDatabaseProcessor extends AbstractProcessor {
    public static final String OPERATE_KEY = "_operate";
    public static final String CATALOG_KEY = "_catalog";
    public static final String SCHEMA_KEY = "_schema";
    public static final String TABLE_KEY = "_table";

    private static final String OPERATE_CREATE = "c";
    private static final String OPERATE_READ = "r";
    private static final String OPERATE_UPDATE = "u";
    private static final String OPERATE_DELETE = "d";

    static final AllowableValue IGNORE_UNMATCHED_FIELD = new AllowableValue("Ignore Unmatched Fields", "Ignore Unmatched Fields",
            "Any field in the JSON document that cannot be mapped to a column in the database is ignored");
    static final AllowableValue FAIL_UNMATCHED_FIELD = new AllowableValue("Fail", "Fail",
            "If the JSON document has any field that cannot be mapped to a column in the database, the FlowFile will be routed to the failure relationship");
    static final AllowableValue IGNORE_UNMATCHED_COLUMN = new AllowableValue("Ignore Unmatched Columns",
            "Ignore Unmatched Columns",
            "Any column in the database that does not have a field in the JSON document will be assumed to not be required.  No notification will be logged");
    static final AllowableValue WARNING_UNMATCHED_COLUMN = new AllowableValue("Warn on Unmatched Columns",
            "Warn on Unmatched Columns",
            "Any column in the database that does not have a field in the JSON document will be assumed to not be required.  A warning will be logged");
    static final AllowableValue FAIL_UNMATCHED_COLUMN = new AllowableValue("Fail on Unmatched Columns",
            "Fail on Unmatched Columns",
            "A flow will fail if any column in the database that does not have a field in the JSON document.  An error will be logged");

    static final PropertyDescriptor CONNECTION_POOL = new PropertyDescriptor.Builder()
            .name("JDBC Connection Pool")
            .description("Specifies the JDBC Connection Pool to use in order to convert the JSON message to a SQL statement. "
                    + "The Connection Pool is necessary in order to determine the appropriate database column types.")
            .identifiesControllerService(DBCPService.class)
            .required(true)
            .build();
    static final PropertyDescriptor UNMATCHED_FIELD_BEHAVIOR = new PropertyDescriptor.Builder()
            .name("Unmatched Field Behavior")
            .description("If an incoming JSON element has a field that does not map to any of the database table's columns, this property specifies how to handle the situation")
            .allowableValues(IGNORE_UNMATCHED_FIELD, FAIL_UNMATCHED_FIELD)
            .defaultValue(IGNORE_UNMATCHED_FIELD.getValue())
            .build();
    static final PropertyDescriptor UNMATCHED_COLUMN_BEHAVIOR = new PropertyDescriptor.Builder()
            .name("Unmatched Column Behavior")
            .description("If an incoming JSON element does not have a field mapping for all of the database table's columns, this property specifies how to handle the situation")
            .allowableValues(IGNORE_UNMATCHED_COLUMN, WARNING_UNMATCHED_COLUMN, FAIL_UNMATCHED_COLUMN)
            .defaultValue(FAIL_UNMATCHED_COLUMN.getValue())
            .build();

    static final PropertyDescriptor QUOTED_COLUMN_IDENTIFIERS = new PropertyDescriptor.Builder()
            .name("jts-quoted-identifiers")
            .displayName("Quote Column Identifiers")
            .description("Enabling this option will cause all column names to be quoted, allowing you to "
                    + "use reserved words as column names in your tables.")
            .allowableValues("true", "false")
            .defaultValue("false")
            .build();

    static final PropertyDescriptor QUOTED_TABLE_IDENTIFIER = new PropertyDescriptor.Builder()
            .name("jts-quoted-table-identifiers")
            .displayName("Quote Table Identifiers")
            .description("Enabling this option will cause the table name to be quoted to support the "
                    + "use of special characters in the table name")
            .allowableValues("true", "false")
            .defaultValue("false")
            .build();

    public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").build();
    static final Relationship REL_FAILURE = new Relationship.Builder()
            .name("failure")
            .description("A FlowFile is routed to this relationship if it cannot be converted into a SQL statement. Common causes include invalid JSON "
                    + "content or the JSON content missing a required field (if using an INSERT statement type).")
            .build();

    private ConcurrentHashMap<SchemaKey, TableSchema> schemaCache = new ConcurrentHashMap<>();
    private boolean ignoreUnmappedFields = true;
    private boolean ignoreUnmappedColumns = true;
    private boolean quotedTable = true;
    private boolean quotedColumn = true;
    private DBCPService dbcpService = null;


    @Override
    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        final List<PropertyDescriptor> properties = new ArrayList<>();
        properties.add(CONNECTION_POOL);
        properties.add(UNMATCHED_FIELD_BEHAVIOR);
        properties.add(UNMATCHED_COLUMN_BEHAVIOR);
        properties.add(QUOTED_COLUMN_IDENTIFIERS);
        properties.add(QUOTED_TABLE_IDENTIFIER);
        return properties;
    }

    @Override
    public Set<Relationship> getRelationships() {
        return new HashSet<>(Arrays.asList(REL_SUCCESS, REL_FAILURE));
    }

    @OnScheduled
    public void onScheduled(final ProcessContext context) {
        ignoreUnmappedFields = IGNORE_UNMATCHED_FIELD.getValue().equalsIgnoreCase(context.getProperty(UNMATCHED_FIELD_BEHAVIOR).getValue());
        ignoreUnmappedColumns = IGNORE_UNMATCHED_COLUMN.getValue().equalsIgnoreCase(context.getProperty(UNMATCHED_COLUMN_BEHAVIOR).getValue());
        quotedTable = context.getProperty(QUOTED_TABLE_IDENTIFIER).asBoolean();
        quotedColumn = context.getProperty(QUOTED_COLUMN_IDENTIFIERS).asBoolean();
        dbcpService = context.getProperty(CONNECTION_POOL).asControllerService(DBCPService.class);
    }

    @OnUnscheduled
    public void onUnscheduled(final ProcessContext context) {
        schemaCache.clear();
    }

    @Override
    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
        final FlowFile flowFile = session.get();

        // Parse the JSON document
        final ObjectMapper mapper = new ObjectMapper();
        final AtomicReference<JsonNode> rootNodeRef = new AtomicReference<>(null);
        try {
            session.read(flowFile, in -> {
                try (final InputStream bufferedIn = new BufferedInputStream(in)) {
                    rootNodeRef.set(mapper.readTree(bufferedIn));
                }
            });
        } catch (final ProcessException pe) {
            getLogger().error("Failed to parse {} as JSON due to {}; routing to failure", new Object[]{flowFile, pe.toString()}, pe);
            session.transfer(flowFile, REL_FAILURE);
            return;
        }

        final JsonNode rootNode = rootNodeRef.get();
        final String operate = rootNode.get(OPERATE_KEY).asText();
        final String catalog = rootNode.get(CATALOG_KEY).asText();
        final String schema = rootNode.get(SCHEMA_KEY).asText();
        final String table = rootNode.get(TABLE_KEY).asText();
        final SchemaKey schemaKey = new SchemaKey(catalog, table);

        TableSchema tableSchema = schemaCache.get(schemaKey);
        if (tableSchema == null) {
            try (final Connection conn = dbcpService.getConnection(flowFile.getAttributes())) {
                tableSchema = TableSchema.from(conn, catalog, schema, table);
                schemaCache.put(schemaKey, tableSchema);
            } catch (final SQLException e) {
                getLogger().error("Failed to convert {} into a SQL statement due to {}; routing to failure", new Object[]{flowFile, e.toString()}, e);
                session.transfer(flowFile, REL_FAILURE);
                return;
            }
        }

        StringBuilder tableName = new StringBuilder(64);
        if (StringUtils.isNotBlank(schema)) {
            tableName.append(schema).append(".");
        }
        if (quotedTable) {
            tableName.append("\"").append(table).append("\"");
        } else {
            tableName.append(table);
        }

        List<String[]> columnValue = new ArrayList<>();
        StringBuilder sql = new StringBuilder(512);

        if (OPERATE_CREATE.equals(operate)) {
            sql.append("INSERT INTO ").append(tableName).append(" (");
            StringBuilder values = new StringBuilder(128);
            values.append(" VALUES (");
            boolean hasBefore = false;
            for (String columnName : tableSchema.getColumns().keySet()) {
                if (rootNode.has(columnName)) {
                    columnValue.add(new String[]{columnName, rootNode.get(columnName).asText()});
                    if (hasBefore) {
                        sql.append(", ");
                        values.append(", ");
                    }
                    appendColumnName(columnName, sql, tableSchema);
                    values.append("?");
                    hasBefore = true;
                } else {
                    getLogger().error("{} PK {} no value, {}; routing to failure", new Object[]{tableName, columnName, flowFile});
                    session.transfer(flowFile, REL_FAILURE);
                    return;
                }
            }
            sql.append(") ");
            values.append(") ");

            sql.append(values);
        } else if (OPERATE_UPDATE.equals(operate)) {
            sql.append("UPDATE ").append(tableName).append(" SET ");
            boolean hasBefore = false;
            for (String columnName : tableSchema.getColumns().keySet()) {
                if (rootNode.has(columnName)) {
                    columnValue.add(new String[]{columnName, rootNode.get(columnName).asText()});
                    if (hasBefore) {
                        sql.append(", ");
                    }
                    appendColumnName(columnName, sql, tableSchema);
                    sql.append("=? ");
                    hasBefore = true;
                } else {
                    getLogger().error("{} PK {} no value, {}; routing to failure", new Object[]{tableName, columnName, flowFile});
                    session.transfer(flowFile, REL_FAILURE);
                    return;
                }
            }

            if (!appendPkWhereCondition(rootNode, tableSchema, tableName, sql, columnValue)) {
                session.transfer(flowFile, REL_FAILURE);
                return;
            }
        } else if (OPERATE_DELETE.equals(operate)) {
            if (tableSchema.primaryKeyColumnNames.isEmpty()) {
                getLogger().error("{} no PK, {}; routing to failure", new Object[]{tableName, flowFile});
                session.transfer(flowFile, REL_FAILURE);
                return;
            }

            sql.append("DELETE FROM ").append(tableName);

            if (!appendPkWhereCondition(rootNode, tableSchema, tableName, sql, columnValue)) {
                session.transfer(flowFile, REL_FAILURE);
                return;
            }
        } else if (OPERATE_READ.equals(operate)) {
            session.transfer(flowFile, REL_SUCCESS);
            return;
        }
        getLogger().info("SQL: {}", new Object[]{sql.toString()});

        try (final Connection conn = dbcpService.getConnection(flowFile.getAttributes())) {
            try (PreparedStatement ps = conn.prepareStatement(sql.toString())) {
                int index = 0;
                for (String[] cv : columnValue) {
                    ColumnDescription cd = tableSchema.getColumns().get(cv[0]);
                    JdbcCommon.setParameter(ps, cv[0], index, cv[1], cd.getDataType(), "");
                }
                ps.executeUpdate();
            }
        } catch (final Exception e) {
            getLogger().error("Failed to convert {} into a SQL statement due to {}; routing to failure", new Object[]{flowFile, e.toString()}, e);
            session.transfer(flowFile, REL_FAILURE);
            return;
        }
        session.transfer(flowFile, REL_SUCCESS);
    }

    private boolean appendPkWhereCondition(JsonNode rootNode, TableSchema tableSchema, StringBuilder tableName, StringBuilder sql, List<String[]> columnValue) {
        sql.append(" WHERE ");
        boolean hasBefore = false;
        for (String columnName : tableSchema.primaryKeyColumnNames) {
            if (rootNode.has(columnName)) {
                columnValue.add(new String[]{columnName, rootNode.get(columnName).asText()});
                if (hasBefore) {
                    sql.append(" AND ");
                }
                appendColumnName(columnName, sql, tableSchema);
                sql.append("=? ");
                hasBefore = true;
            } else {
                getLogger().error("{} PK {} no value, {}; routing to failure", new Object[]{tableName, columnName, rootNode});
                return false;
            }
        }
        return true;
    }

    private void appendColumnName(String columnName, StringBuilder sql, TableSchema tableSchema) {
        if (quotedColumn) {
            sql.append(tableSchema.quotedIdentifierString)
                    .append(columnName)
                    .append(tableSchema.quotedIdentifierString);
        } else {
            sql.append(columnName);
        }
    }


    private static class TableSchema {
        private List<String> requiredColumnNames;
        private Set<String> primaryKeyColumnNames;
        private Map<String, ColumnDescription> columns;
        private String quotedIdentifierString;

        private TableSchema(final List<ColumnDescription> columnDescriptions,
                            final Set<String> primaryKeyColumnNames, final String quotedIdentifierString) {
            this.columns = new HashMap<>();
            this.primaryKeyColumnNames = primaryKeyColumnNames;
            this.quotedIdentifierString = quotedIdentifierString;

            this.requiredColumnNames = new ArrayList<>();
            for (final ColumnDescription desc : columnDescriptions) {
                columns.put(desc.columnName, desc);
                if (desc.isRequired()) {
                    requiredColumnNames.add(desc.columnName);
                }
            }
        }

        public Map<String, ColumnDescription> getColumns() {
            return columns;
        }

        public List<String> getRequiredColumnNames() {
            return requiredColumnNames;
        }

        public Set<String> getPrimaryKeyColumnNames() {
            return primaryKeyColumnNames;
        }

        public String getQuotedIdentifierString() {
            return quotedIdentifierString;
        }

        public static TableSchema from(final Connection conn, final String catalog, final String schema, final String tableName) throws SQLException {
            final DatabaseMetaData dmd = conn.getMetaData();

            try (final ResultSet colRs = dmd.getColumns(catalog, schema, tableName, "%")) {
                final List<ColumnDescription> cols = new ArrayList<>();
                while (colRs.next()) {
                    final ColumnDescription col = ColumnDescription.from(colRs);
                    cols.add(col);
                }

                final Set<String> primaryKeyColumns = new HashSet<>();
                try (final ResultSet rs = conn.getMetaData().getPrimaryKeys(catalog, null, tableName)) {
                    while (rs.next()) {
                        final String colName = rs.getString("COLUMN_NAME");
                        primaryKeyColumns.add(colName);
                    }
                }

                return new TableSchema(cols, primaryKeyColumns, dmd.getIdentifierQuoteString());
            }
        }
    }

    private static class ColumnDescription {
        private final String columnName;
        private final int dataType;
        private final boolean required;
        private final Integer columnSize;

        private ColumnDescription(final String columnName, final int dataType, final boolean required, final Integer columnSize) {
            this.columnName = columnName;
            this.dataType = dataType;
            this.required = required;
            this.columnSize = columnSize;
        }

        public int getDataType() {
            return dataType;
        }

        public Integer getColumnSize() {
            return columnSize;
        }

        public String getColumnName() {
            return columnName;
        }

        public boolean isRequired() {
            return required;
        }

        public static ColumnDescription from(final ResultSet resultSet) throws SQLException {
            final ResultSetMetaData md = resultSet.getMetaData();
            List<String> columns = new ArrayList<>();

            for (int i = 1; i < md.getColumnCount() + 1; i++) {
                columns.add(md.getColumnName(i));
            }
            // COLUMN_DEF must be read first to work around Oracle bug, see NIFI-4279 for details
            final String defaultValue = resultSet.getString("COLUMN_DEF");
            final String columnName = resultSet.getString("COLUMN_NAME");
            final int dataType = resultSet.getInt("DATA_TYPE");
            final int colSize = resultSet.getInt("COLUMN_SIZE");

            final String nullableValue = resultSet.getString("IS_NULLABLE");
            final boolean isNullable = "YES".equalsIgnoreCase(nullableValue) || nullableValue.isEmpty();

            String autoIncrementValue = "NO";

            if (columns.contains("IS_AUTOINCREMENT")) {
                autoIncrementValue = resultSet.getString("IS_AUTOINCREMENT");
            }

            final boolean isAutoIncrement = "YES".equalsIgnoreCase(autoIncrementValue);
            final boolean required = !isNullable && !isAutoIncrement && defaultValue == null;

            return new ColumnDescription(columnName, dataType, required, colSize == 0 ? null : colSize);
        }
    }

    private static class SchemaKey {
        private final String catalog;
        private final String tableName;

        public SchemaKey(final String catalog, final String tableName) {
            this.catalog = catalog;
            this.tableName = tableName;
        }

        @Override
        public int hashCode() {
            final int prime = 31;
            int result = 1;
            result = prime * result + ((catalog == null) ? 0 : catalog.hashCode());
            result = prime * result + ((tableName == null) ? 0 : tableName.hashCode());
            return result;
        }

        @Override
        public boolean equals(final Object obj) {
            if (this == obj) {
                return true;
            }
            if (obj == null) {
                return false;
            }
            if (getClass() != obj.getClass()) {
                return false;
            }

            final SchemaKey other = (SchemaKey) obj;
            if (catalog == null) {
                if (other.catalog != null) {
                    return false;
                }
            } else if (!catalog.equals(other.catalog)) {
                return false;
            }

            if (tableName == null) {
                if (other.tableName != null) {
                    return false;
                }
            } else if (!tableName.equals(other.tableName)) {
                return false;
            }

            return true;
        }
    }
}
